-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51119][SQL] Readers on executors resolving EXISTS_DEFAULT should not call catalogs #49840
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving some explanations
* | ||
* VisibleForTesting | ||
*/ | ||
def analyzeExistingDefault(field: StructField, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is simpler version of analyze, used for existsDefaultValues (which is called from executors). Make in another method, as we have another may have an opportunity to simplify further, but for now it seems some part of analysis is still needed to resolve some functions like array(). The problematic code of FinishAnalysis was removed though.
/** | ||
* Visible for testing | ||
*/ | ||
def setAnalyzerAndOptimizer(analyzer: Analyzer, optimizer: Optimizer): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its hard to reproduce the issue in unit test, so I end up mocking these members to verify that the catalogs are not called.
let me know, Im totally fine with removing this if we think existing test coverage is ok.
…ld not call catalogs
ccd071e
to
ce67d1d
Compare
ce67d1d
to
2fa0b0b
Compare
Had a chat offline with @cloud-fan who suggest simplifying the analyzeExistence to be just the following bare bones code to resolve functions.
Thanks for the suggestion, this simplifies it a lot. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
Outdated
Show resolved
Hide resolved
CatalystSqlParser.parseExpression(sql).transformUp { | ||
case u: UnresolvedFunction => | ||
assert(u.nameParts.length == 1) | ||
assert(!u.isDistinct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check other fields in UnresolvedFunction
as well? The SQL string produced by Literal#sql
should not specify any extra fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, i added a bunch of asserts. Not too familiar with these flags, please double check
"", field.name, defaultSQL, null) | ||
} | ||
|
||
expr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we still call coerceDefaultValue
at the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be strictly necessary, but would be a no-op on executors, and we'll be safer to have it, so might as well keep it to double-check that the existence default value is the type we expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this!!
"", field.name, defaultSQL, null) | ||
} | ||
|
||
expr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be strictly necessary, but would be a no-op on executors, and we'll be safer to have it, so might as well keep it to double-check that the existence default value is the type we expect.
expr match { | ||
case _: ExprLiteral | _: Cast => expr | ||
} | ||
analyzeExistingDefault(field, text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unit testing idea: we could try to stub out the catalog manager with something that always throws errors, if we wanted to be extra careful. Our DefaultColumnAnalyzer
currently looks like this:
/**
* This is an Analyzer for processing default column values using built-in functions only.
*/
object DefaultColumnAnalyzer extends Analyzer(
new CatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) {
}
We could use a different catalog manager that always throws errors there specifically when we exercise this getExistenceDefaultValues
method. This would make sure we're not inadvertently doing catalog manager lookups there anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after resolving remaining comments. Thanks again for the fix!
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @szehon-ho , @cloud-fan , @dtenedor . (Pending CIs).
…ld not call catalogs ### What changes were proposed in this pull request? Simplify the resolution of EXISTS_DEFAULT on ResolveDefaultColumns::getExistenceDefaultValues(), which are called from file readers on executors. ### Why are the changes needed? Spark executors unnecessary contacts catalogs when resolving EXISTS_DEFAULTS (used for default values for existing data) for a column. Detailed explanation: The code path for default values first runs an analysis of the user-provided CURRENT_DEFAULT value for a column (to evaluate functions, etc), and uses the result sql to save as the column's EXISTS_DEFAULT. EXISTS_DEFAULT is then used to avoid having to rewrite existing data using backfill to fill this value in the files. When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT metadata and use the value for null values it finds in that column. The problem is, this second step on read redundantly runs all the analyzer rules again and finish analysis rules on EXISTS_DEFAULTS, some of which contact the catalog unnecessarily. Some of those rules are unnecessary as they were already run before to get the value. Worse, it may cause exceptions if the executors are not configured properly to reach the catalog, such as: ``` Caused by: org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog. at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400) at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55) at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130) at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639) ... 21 more Caused by: java.lang.IllegalStateException: No active or default Spark session found ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test in StructTypeSuite. I had to expose for testing some members in ResolveDefaultColumns. ### Was this patch authored or co-authored using generative AI tooling? No Closes #49840 from szehon-ho/SPARK-51119. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 937decc) Signed-off-by: Dongjoon Hyun <[email protected]>
According to the affected version, I merged this to master/4.0. |
Thanks @cloud-fan @dtenedor @dongjoon-hyun ! |
assert(!u.isInternal) | ||
FunctionRegistry.builtin.lookupFunction(FunctionIdentifier(u.nameParts.head), u.arguments) | ||
} match { | ||
case c: Cast if c.needsTimeZone => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the CAST can be nested inside array/map/struct, we should put this case match inside the transformUp
, together with case u: UnresolvedFunction
@szehon-ho can you make a followup PR for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan sure, let me do that.
BTW, I looked a little bit and couldnt reproduce a failure with the current implementation using a following unit test with a nested cast:
test("SPARK-51119: array of timestamp should have timezone if default values castable") {
withTable("t") {
sql(s"CREATE TABLE t(key int, c ARRAY<STRING> DEFAULT " +
s"ARRAY(CAST(timestamp '2018-11-17' AS STRING))) " +
s"USING parquet")
sql("INSERT INTO t (key) VALUES(1)")
checkAnswer(sql("select * from t"), Row(1, Array("2018-11-17 00:00:00")))
}
}
Unlike the failing case of top-level cast:
test("SPARK-46958: timestamp should have timezone for resolvable if default values castable") {
val defaults = Seq("timestamp '2018-11-17'", "CAST(timestamp '2018-11-17' AS STRING)")
defaults.foreach { default =>
withTable("t") {
sql(s"CREATE TABLE t(key int, c STRING DEFAULT $default) " +
s"USING parquet")
sql("INSERT INTO t (key) VALUES(1)")
checkAnswer(sql("select * from t"), Row(1, "2018-11-17 00:00:00"))
}
}
}
EXISTS_DEFAULT is saved without a cast in the first case: ARRAY('2018-11-17 00:00:00')
(looks like it got evaluated)
and with a cast in the second case: CAST(TIMESTAMP '2018-11-17 00:00:00' AS STRING)
So I think in this particular scenario, it doesnt matter. But agree that it is better to have it, as we are making a generic method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking at the previous test failure
Cause: org.apache.spark.sql.AnalysisException: [INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION] Failed to execute command because the destination column or variable `c` has a DEFAULT value CAST(TIMESTAMP '2018-11-17 00:00:00' AS STRING), which fails to resolve as a valid expression. SQLSTATE: 42623
CAST(TIMESTAMP '2018-11-17 00:00:00' AS STRING)
can't be generated by Literal#sql
. Seems we have some misunderstanding about how this existing default string is generated. @szehon-ho can you take a closer look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced offline, see the other comment.
@@ -320,6 +319,29 @@ object ResolveDefaultColumns extends QueryErrorsBase | |||
coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the CAST is added here, but it should be constant-folded before we generate the existing default string. We need to debug it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced with @cloud-fan offline, this is not constant folded after this line, when analyzing to create EXISTS_DEFAULT. So in the input of analyzeExistsDefault() , EXISTS_DEFAULT sometimes has a top level CAST
Synced with @cloud-fan offline, the current code should work for this case. Made follow up #49881 to do some cleanup to put the logic in the right place. |
…EFAULT should not call catalogs ### What changes were proposed in this pull request? Code cleanup for #49840. Literal#fromSQL should be the inverse of Literal#sql. The cast handling is an artifact of the calling ResolveDefaultColumns object that adds the cast when making EXISTS_DEFAULT, so its handling is moved to ResolveDefaultColumns as well. ### Why are the changes needed? Code cleanup ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #49881 from szehon-ho/SPARK-51119-follow. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…EFAULT should not call catalogs ### What changes were proposed in this pull request? Code cleanup for #49840. Literal#fromSQL should be the inverse of Literal#sql. The cast handling is an artifact of the calling ResolveDefaultColumns object that adds the cast when making EXISTS_DEFAULT, so its handling is moved to ResolveDefaultColumns as well. ### Why are the changes needed? Code cleanup ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #49881 from szehon-ho/SPARK-51119-follow. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 5135329) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Simplify the resolution of EXISTS_DEFAULT on ResolveDefaultColumns::getExistenceDefaultValues(), which are called from file readers on executors.
Why are the changes needed?
Spark executors unnecessary contacts catalogs when resolving EXISTS_DEFAULTS (used for default values for existing data) for a column.
Detailed explanation: The code path for default values first runs an analysis of the user-provided CURRENT_DEFAULT value for a column (to evaluate functions, etc), and uses the result sql to save as the column's EXISTS_DEFAULT. EXISTS_DEFAULT is then used to avoid having to rewrite existing data using backfill to fill this value in the files. When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT metadata and use the value for null values it finds in that column.
The problem is, this second step on read redundantly runs all the analyzer rules again and finish analysis rules on EXISTS_DEFAULTS, some of which contact the catalog unnecessarily. Some of those rules are unnecessary as they were already run before to get the value.
Worse, it may cause exceptions if the executors are not configured properly to reach the catalog, such as:
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a test in StructTypeSuite. I had to expose for testing some members in ResolveDefaultColumns.
Was this patch authored or co-authored using generative AI tooling?
No